package com.fly.demo.tcp.service;

import java.io.IOException;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;

import com.fly.core.utils.JsonBeanUtils;
import com.fly.demo.tcp.entity.Command;

import lombok.extern.slf4j.Slf4j;

/**
 * 结果回调处理
 */
@Slf4j
@Service
public class ResultCallBack implements Observer
{
    Map<String, String> resultMap = new ConcurrentHashMap<>();
    
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    
    /**
     * 观察者接受数据处理
     */
    @Override
    public void update(Observable observable, Object msg)
    {
        log.info("### accept #### {}", msg);
        try
        {
            if (msg instanceof String)
            {
                String json = StringUtils.substringAfter((String)msg, "hello Command！");
                Command command = JsonBeanUtils.jsonToBean(json, Command.class);
                log.info("{}", command);
                resultMap.put(command.getCommandId(), json);
            }
        }
        catch (IOException e)
        {
            log.error(e.getMessage());
        }
    }
    
    /**
     * 获取命令处理结果
     * 
     * @param commandId
     * @return
     */
    public String queryResult(String commandId)
    {
        return resultMap.get(commandId);
    }
    
    /**
     * 业务线程处理业务,DeferredResult可以通过任何线程来计算返回一个结果
     * 
     * @param deferredResult
     * @param commandId
     * @throws TimeoutException
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public void processResult(DeferredResult<String> deferredResult, String commandId)
        throws InterruptedException, ExecutionException, TimeoutException
    {
        processResult02(deferredResult, commandId);
    }
    
    /**
     * 无超时设置，不推荐
     * 
     * @param deferredResult
     * @param commandId
     */
    void processResult01(DeferredResult<String> deferredResult, String commandId)
    {
        executorService.execute(() -> {
            int index = 0;
            while (!resultMap.containsKey(commandId) && index++ < 50)
            {
                try
                {
                    log.info("waitting......");
                    TimeUnit.MILLISECONDS.sleep(20);
                }
                catch (InterruptedException e)
                {
                }
            }
            String result = StringUtils.defaultString(resultMap.get(commandId), "响应超时，请重试");
            deferredResult.setResult(result);
        });
    }
    
    /**
     * 有超时设置（推荐）
     * 
     * 
     * @param deferredResult
     * @param commandId
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException
     */
    private void processResult02(DeferredResult<String> deferredResult, String commandId)
        throws InterruptedException, ExecutionException, TimeoutException
    {
        Future<String> future = executorService.submit(() -> {
            int index = 0;
            while (!resultMap.containsKey(commandId) && index++ < 50)
            {
                log.info("waitting......");
                TimeUnit.MILLISECONDS.sleep(20);
            }
            return resultMap.get(commandId);
        });
        String result = StringUtils.defaultString(future.get(1000L, TimeUnit.MILLISECONDS), "响应超时，请重试");
        deferredResult.setResult(result);
    }
}
